package org.budo.warehouse.logic.producer;

import java.util.List;

import javax.annotation.Resource;

import org.budo.canal.server.BudoCanalServer;
import org.budo.support.dao.page.Page;
import org.budo.support.spring.context.aware.RootApplicationContextRefreshedEventListener;
import org.budo.warehouse.logic.bean.DynamicBeanService;
import org.budo.warehouse.service.api.IDataNodeService;
import org.budo.warehouse.service.api.IPipelineService;
import org.budo.warehouse.service.entity.DataNode;
import org.budo.warehouse.service.entity.Pipeline;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

/**
 * @author limingwei
 */
@Component
public class DataProducerLoader extends RootApplicationContextRefreshedEventListener {
    @Resource
    private IDataNodeService dataNodeService;

    @Resource
    private IPipelineService pipelineService;

    @Resource
    private BudoCanalServer budoCanalServer;

    @Resource
    private DynamicBeanService dynamicBeanService;

    /**
     * 为各种入口建立监听
     */
    @Override
    public void onRootApplicationContextRefreshedEvent(ContextRefreshedEvent contextRefreshedEvent) {
        List<DataNode> sourceDataNodes = dataNodeService.listSourceDataNodes(Page.max());

        for (DataNode sourceDataNode : sourceDataNodes) {
            String url = sourceDataNode.getUrl();
            if (url.startsWith("jdbc:mysql://")) {
                dynamicBeanService.canalDataProducer(sourceDataNode);
                continue;
            }

            if (url.startsWith("async:kafka://") || url.startsWith("async:activemq://")) { // 192.168.4.253:9092 tcp://192.168.4.251:61616
                List<Pipeline> pipelines = pipelineService.listBySourceDataNode(sourceDataNode.getId(), Page.max());
                for (Pipeline pipeline : pipelines) {
                    dynamicBeanService.asyncDataProducer(sourceDataNode, pipeline);
                }
                continue;
            }

            throw new RuntimeException("#58 不支持的数据源头节点, url=" + url);
        }

        budoCanalServer.start();
    }
}